/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk;



import com.bff.gaia.unified.sdk.annotations.Internal;

import com.bff.gaia.unified.sdk.coders.CoderRegistry;

import com.bff.gaia.unified.sdk.io.Read;

import com.bff.gaia.unified.sdk.options.PipelineOptions;

import com.bff.gaia.unified.sdk.options.PipelineOptionsFactory;

import com.bff.gaia.unified.sdk.runners.AppliedPTransform;

import com.bff.gaia.unified.sdk.runners.PTransformOverride;

import com.bff.gaia.unified.sdk.runners.PTransformOverrideFactory;

import com.bff.gaia.unified.sdk.runners.PTransformOverrideFactory.PTransformReplacement;

import com.bff.gaia.unified.sdk.runners.PTransformOverrideFactory.ReplacementOutput;

import com.bff.gaia.unified.sdk.runners.TransformHierarchy;

import com.bff.gaia.unified.sdk.runners.TransformHierarchy.Node;

import com.bff.gaia.unified.sdk.schemas.SchemaRegistry;

import com.bff.gaia.unified.sdk.transforms.Create;

import com.bff.gaia.unified.sdk.transforms.PTransform;

import com.bff.gaia.unified.sdk.util.UserCodeException;

import com.bff.gaia.unified.sdk.values.*;

import com.bff.gaia.unified.vendor.guava.com.google.common.annotations.VisibleForTesting;

import com.bff.gaia.unified.vendor.guava.com.google.common.base.Function;

import com.bff.gaia.unified.vendor.guava.com.google.common.base.Joiner;

import com.bff.gaia.unified.vendor.guava.com.google.common.base.Predicate;

import com.bff.gaia.unified.vendor.guava.com.google.common.base.Predicates;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.*;

import com.bff.gaia.unified.sdk.values.PBegin;

import com.bff.gaia.unified.sdk.values.PCollection;

import com.bff.gaia.unified.sdk.values.PInput;

import com.bff.gaia.unified.sdk.values.POutput;

import com.bff.gaia.unified.sdk.values.PValue;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;



import javax.annotation.Nullable;

import java.util.*;



import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkNotNull;

import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkState;

import static com.bff.gaia.unified.vendor.guava.com.google.common.collect.Iterables.transform;



/**

 * A {@link Pipeline} manages a directed acyclic graph of {@link PTransform PTransforms}, and the

 * {@link PCollection PCollections} that the {@link PTransform PTransforms} consume and produce.

 *

 * <p>Each {@link Pipeline} is self-contained and isolated from any other {@link Pipeline}. The

 * {@link PValue PValues} that are inputs and outputs of each of a {@link Pipeline Pipeline's}

 * {@link PTransform PTransforms} are also owned by that {@link Pipeline}. A {@link PValue} owned by

 * one {@link Pipeline} can be read only by {@link PTransform PTransforms} also owned by that {@link

 * Pipeline}. {@link Pipeline Pipelines} can safely be executed concurrently.

 *

 * <p>Here is a typical example of use:

 *

 * <pre>{@code

 * // Start by defining the options for the pipeline.

 * PipelineOptions options = PipelineOptionsFactory.create();

 * // Then create the pipeline. The runner is determined by the options.

 * Pipeline p = Pipeline.create(options);

 *

 * // A root PTransform, like TextIO.Read or Create, gets added

 * // to the Pipeline by being applied:

 * PCollection<String> lines =

 *     p.apply(TextIO.read().from("gs://bucket/dir/file*.txt"));

 *

 * // A Pipeline can have multiple root transforms:

 * PCollection<String> moreLines =

 *     p.apply(TextIO.read().from("gs://bucket/other/dir/file*.txt"));

 * PCollection<String> yetMoreLines =

 *     p.apply(Create.of("yet", "more", "lines").withCoder(StringUtf8Coder.of()));

 *

 * // Further PTransforms can be applied, in an arbitrary (acyclic) graph.

 * // Subsequent PTransforms (and intermediate PCollections etc.) are

 * // implicitly part of the same Pipeline.

 * PCollection<String> allLines =

 *     PCollectionList.of(lines).and(moreLines).and(yetMoreLines)

 *     .apply(new Flatten<String>());

 * PCollection<KV<String, Integer>> wordCounts =

 *     allLines

 *     .apply(ParDo.of(new ExtractWords()))

 *     .apply(new Count<String>());

 * PCollection<String> formattedWordCounts =

 *     wordCounts.apply(ParDo.of(new FormatCounts()));

 * formattedWordCounts.apply(TextIO.write().to("gs://bucket/dir/counts.txt"));

 *

 * // PTransforms aren't executed when they're applied, rather they're

 * // just added to the Pipeline.  Once the whole Pipeline of PTransforms

 * // is constructed, the Pipeline's PTransforms can be run using a

 * // PipelineRunner.  The default PipelineRunner executes the Pipeline

 * // directly, sequentially, in this one process, which is useful for

 * // unit tests and simple experiments:

 * p.run();

 *

 * }</pre>

 */

public class Pipeline {

  private static final Logger LOG = LoggerFactory.getLogger(Pipeline.class);

  /**

   * Thrown during execution of a {@link Pipeline}, whenever user code within that {@link Pipeline}

   * throws an exception.

   *

   * <p>The original exception thrown by user code may be retrieved via {@link #getCause}.

   */

  public static class PipelineExecutionException extends RuntimeException {

    /** Wraps {@code cause} into a {@link PipelineExecutionException}. */

    public PipelineExecutionException(Throwable cause) {

      super(cause);

    }

  }



  /////////////////////////////////////////////////////////////////////////////

  // Public operations.



  /** Constructs a pipeline from default {@link PipelineOptions}. */

  public static Pipeline create() {

    Pipeline pipeline = new Pipeline(PipelineOptionsFactory.create());

    LOG.debug("Creating {}", pipeline);

    return pipeline;

  }



  /** Constructs a pipeline from the provided {@link PipelineOptions}. */

  public static Pipeline create(PipelineOptions options) {

    // TODO: fix runners that mutate PipelineOptions in this method, then remove this line

    PipelineRunner.fromOptions(options);



    Pipeline pipeline = new Pipeline(options);

    LOG.debug("Creating {}", pipeline);

    return pipeline;

  }



  /**

   * Returns a {@link PBegin} owned by this Pipeline. This serves as the input of a root {@link

   * PTransform} such as {@link Read} or {@link Create}.

   */

  public PBegin begin() {

    return PBegin.in(this);

  }



  /**

   * Like {@link #apply(String, PTransform)} but the transform node in the {@link Pipeline} graph

   * will be named according to {@link PTransform#getName}.

   *

   * @see #apply(String, PTransform)

   */

  public <OutputT extends POutput> OutputT apply(PTransform<? super PBegin, OutputT> root) {

    return begin().apply(root);

  }



  /**

   * Adds a root {@link PTransform}, such as {@link Read} or {@link Create}, to this {@link

   * Pipeline}.

   *

   * <p>The node in the {@link Pipeline} graph will use the provided {@code name}. This name is used

   * in various places, including the monitoring UI, logging, and to stably identify this node in

   * the {@link Pipeline} graph upon update.

   *

   * <p>Alias for {@code begin().apply(name, root)}.

   */

  public <OutputT extends POutput> OutputT apply(

      String name, PTransform<? super PBegin, OutputT> root) {

    return begin().apply(name, root);

  }



  @Internal

  public static Pipeline forTransformHierarchy(

	  TransformHierarchy transforms, PipelineOptions options) {

    return new Pipeline(transforms, options);

  }



  @Internal

  public PipelineOptions getOptions() {

    return defaultOptions;

  }



  /**

   * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>

   *

   * <p>Replaces all nodes that match a {@link PTransformOverride} in this pipeline. Overrides are

   * applied in the order they are present within the list.

   *

   * <p>After all nodes are replaced, ensures that no nodes in the updated graph match any of the

   * overrides.

   */

  @Internal

  public void replaceAll(List<PTransformOverride> overrides) {

    for (PTransformOverride override : overrides) {

      replace(override);

    }

    checkNoMoreMatches(overrides);

  }



  private void checkNoMoreMatches(final List<PTransformOverride> overrides) {

    traverseTopologically(

        new PipelineVisitor.Defaults() {

          SetMultimap<Node, PTransformOverride> matched = HashMultimap.create();



          @Override

          public CompositeBehavior enterCompositeTransform(Node node) {

            if (!node.isRootNode()) {

              checkForMatches(node);

            }

            if (matched.containsKey(node)) {

              return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;

            } else {

              return CompositeBehavior.ENTER_TRANSFORM;

            }

          }



          @Override

          public void leaveCompositeTransform(Node node) {

            if (node.isRootNode()) {

              checkState(

                  matched.isEmpty(), "Found nodes that matched overrides. Matches: %s", matched);

            }

          }



          @Override

          public void visitPrimitiveTransform(Node node) {

            checkForMatches(node);

          }



          private void checkForMatches(Node node) {

            for (PTransformOverride override : overrides) {

              if (override

                  .getMatcher()

                  .matchesDuringValidation(node.toAppliedPTransform(getPipeline()))) {

                matched.put(node, override);

              }

            }

          }

        });

  }



  private void replace(final PTransformOverride override) {

    final Set<Node> matches = new HashSet<>();

    final Set<Node> freedNodes = new HashSet<>();

    traverseTopologically(

        new PipelineVisitor.Defaults() {

          @Override

          public CompositeBehavior enterCompositeTransform(Node node) {

            if (!node.isRootNode() && freedNodes.contains(node.getEnclosingNode())) {

              // This node will be freed because its parent will be freed.

              freedNodes.add(node);

              return CompositeBehavior.ENTER_TRANSFORM;

            }

            if (!node.isRootNode()

                && override.getMatcher().matches(node.toAppliedPTransform(getPipeline()))) {

              matches.add(node);

              // This node will be freed. When we visit any of its children, they will also be freed

              freedNodes.add(node);

            }

            return CompositeBehavior.ENTER_TRANSFORM;

          }



          @Override

          public void visitPrimitiveTransform(Node node) {

            if (freedNodes.contains(node.getEnclosingNode())) {

              freedNodes.add(node);

            } else if (override.getMatcher().matches(node.toAppliedPTransform(getPipeline()))) {

              matches.add(node);

              freedNodes.add(node);

            }

          }

        });

    for (Node freedNode : freedNodes) {

      usedFullNames.remove(freedNode.getFullName());

    }

    for (Node match : matches) {

      applyReplacement(match, override.getOverrideFactory());

    }

  }



  /**

   * Runs this {@link Pipeline} according to the {@link PipelineOptions} used to create the {@link

   * Pipeline} via {@link #create(PipelineOptions)}.

   */

  public PipelineResult run() {

    return run(defaultOptions);

  }



  /**

   * Runs this {@link Pipeline} using the given {@link PipelineOptions}, using the runner specified

   * by the options.

   */

  public PipelineResult run(PipelineOptions options) {

    PipelineRunner<? extends PipelineResult> runner = PipelineRunner.fromOptions(options);

    // Ensure all of the nodes are fully specified before a PipelineRunner gets access to the

    // pipeline.

    LOG.debug("Running {} via {}", this, runner);

    try {

      validate(options);

      return runner.run(this);

    } catch (UserCodeException e) {

      // This serves to replace the stack with one that ends here and

      // is caused by the caught UserCodeException, thereby splicing

      // out all the stack frames in between the PipelineRunner itself

      // and where the worker calls into the user's code.

      throw new PipelineExecutionException(e.getCause());

    }

  }



  /** Returns the {@link CoderRegistry} that this {@link Pipeline} uses. */

  public CoderRegistry getCoderRegistry() {

    if (coderRegistry == null) {

      coderRegistry = CoderRegistry.createDefault();

    }

    return coderRegistry;

  }



  public SchemaRegistry getSchemaRegistry() {

    if (schemaRegistry == null) {

      schemaRegistry = SchemaRegistry.createDefault();

    }

    return schemaRegistry;

  }



  /////////////////////////////////////////////////////////////////////////////

  // Below here are operations that aren't normally called by users.



  /**

   * @deprecated this should never be used - every {@link Pipeline} has a registry throughout its

   *     lifetime.

   */

  @Deprecated

  public void setCoderRegistry(CoderRegistry coderRegistry) {

    this.coderRegistry = coderRegistry;

  }



  /**

   * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>

   *

   * <p>A {@link PipelineVisitor} can be passed into {@link Pipeline#traverseTopologically} to be

   * called for each of the transforms and values in the {@link Pipeline}.

   */

  @Internal

  public interface PipelineVisitor {

    /**

     * Called before visiting anything values or transforms, as many uses of a visitor require

     * access to the {@link Pipeline} object itself.

     */

    void enterPipeline(Pipeline p);



    /**

     * Called for each composite transform after all topological predecessors have been visited but

     * before any of its component transforms.

     *

     * <p>The return value controls whether or not child transforms are visited.

     */

    CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node);



    /**

     * Called for each composite transform after all of its component transforms and their outputs

     * have been visited.

     */

    void leaveCompositeTransform(TransformHierarchy.Node node);



    /**

     * Called for each primitive transform after all of its topological predecessors and inputs have

     * been visited.

     */

    void visitPrimitiveTransform(TransformHierarchy.Node node);



    /** Called for each value after the transform that produced the value has been visited. */

    void visitValue(PValue value, TransformHierarchy.Node producer);



    /** Called when all values and transforms in a {@link Pipeline} have been visited. */

    void leavePipeline(Pipeline pipeline);



    /**

     * Control enum for indicating whether or not a traversal should process the contents of a

     * composite transform or not.

     */

    enum CompositeBehavior {

      ENTER_TRANSFORM,

      DO_NOT_ENTER_TRANSFORM

    }



    /**

     * Default no-op {@link PipelineVisitor} that enters all composite transforms. User

     * implementations can override just those methods they are interested in.

     */

    class Defaults implements PipelineVisitor {



      @Nullable

	  private Pipeline pipeline;



      protected Pipeline getPipeline() {

        if (pipeline == null) {

          throw new IllegalStateException(

              "Illegal access to pipeline after visitor traversal was completed");

        }

        return pipeline;

      }



      @Override

      public void enterPipeline(Pipeline pipeline) {

        this.pipeline = checkNotNull(pipeline);

      }



      @Override

      public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {

        return CompositeBehavior.ENTER_TRANSFORM;

      }



      @Override

      public void leaveCompositeTransform(TransformHierarchy.Node node) {}



      @Override

      public void visitPrimitiveTransform(TransformHierarchy.Node node) {}



      @Override

      public void visitValue(PValue value, TransformHierarchy.Node producer) {}



      @Override

      public void leavePipeline(Pipeline pipeline) {

        this.pipeline = null;

      }

    }

  }



  /**

   * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>

   *

   * <p>Invokes the {@link PipelineVisitor PipelineVisitor's} {@link

   * PipelineVisitor#visitPrimitiveTransform} and {@link PipelineVisitor#visitValue} operations on

   * each of this {@link Pipeline Pipeline's} transform and value nodes, in forward topological

   * order.

   *

   * <p>Traversal of the {@link Pipeline} causes {@link PTransform PTransforms} and {@link PValue

   * PValues} owned by the {@link Pipeline} to be marked as finished, at which point they may no

   * longer be modified.

   *

   * <p>Typically invoked by {@link PipelineRunner} subclasses.

   */

  @Internal

  public void traverseTopologically(PipelineVisitor visitor) {

    visitor.enterPipeline(this);

    transforms.visit(visitor);

    visitor.leavePipeline(this);

  }



  /**

   * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>

   *

   * <p>Like {@link #applyTransform(String, PInput, PTransform)} but defaulting to the name provided

   * by the {@link PTransform}.

   */

  @Internal

  public static <InputT extends PInput, OutputT extends POutput> OutputT applyTransform(

      InputT input, PTransform<? super InputT, OutputT> transform) {

    return input.getPipeline().applyInternal(transform.getName(), input, transform);

  }



  /**

   * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>

   *

   * <p>Applies the given {@code PTransform} to this input {@code InputT} and returns its {@code

   * OutputT}. This uses {@code name} to identify this specific application of the transform. This

   * name is used in various places, including the monitoring UI, logging, and to stably identify

   * this application node in the {@link Pipeline} graph during update.

   *

   * <p>Each {@link PInput} subclass that provides an {@code apply} method should delegate to this

   * method to ensure proper registration with the {@link PipelineRunner}.

   */

  @Internal

  public static <InputT extends PInput, OutputT extends POutput> OutputT applyTransform(

      String name, InputT input, PTransform<? super InputT, OutputT> transform) {

    return input.getPipeline().applyInternal(name, input, transform);

  }



  /////////////////////////////////////////////////////////////////////////////

  // Below here are internal operations, never called by users.



  private final TransformHierarchy transforms;

  private Set<String> usedFullNames = new HashSet<>();



  /** Lazily initialized; access via {@link #getCoderRegistry()}. */

  @Nullable

  private CoderRegistry coderRegistry;



  /** Lazily initialized; access via {@link #getSchemaRegistry()}. */

  @Nullable

  private SchemaRegistry schemaRegistry;



  private final Multimap<String, PTransform<?, ?>> instancePerName = ArrayListMultimap.create();

  private final PipelineOptions defaultOptions;



  private Pipeline(TransformHierarchy transforms, PipelineOptions options) {

    this.transforms = transforms;

    this.defaultOptions = options;

  }



  protected Pipeline(PipelineOptions options) {

    this(new TransformHierarchy(), options);

  }



  @Override

  public String toString() {

    return "Pipeline#" + hashCode();

  }



  /**

   * Applies a {@link PTransform} to the given {@link PInput}.

   *

   * @see Pipeline#apply

   */

  private <InputT extends PInput, OutputT extends POutput> OutputT applyInternal(

      String name, InputT input, PTransform<? super InputT, OutputT> transform) {

    String namePrefix = transforms.getCurrent().getFullName();

    String uniqueName = uniquifyInternal(namePrefix, name);



    final String builtName = buildName(namePrefix, name);

    instancePerName.put(builtName, transform);



    LOG.debug("Adding {} to {}", transform, this);

    transforms.pushNode(uniqueName, input, transform);

    try {

      transforms.finishSpecifyingInput();

      OutputT output = transform.expand(input);

      transforms.setOutput(output);



      return output;

    } finally {

      transforms.popNode();

    }

  }



  private <

          InputT extends PInput,

          OutputT extends POutput,

          TransformT extends PTransform<? super InputT, OutputT>>

      void applyReplacement(

          Node original,

          PTransformOverrideFactory<InputT, OutputT, TransformT> replacementFactory) {

    PTransformReplacement<InputT, OutputT> replacement =

        replacementFactory.getReplacementTransform(

            (AppliedPTransform<InputT, OutputT, TransformT>) original.toAppliedPTransform(this));

    if (replacement.getTransform() == original.getTransform()) {

      return;

    }

    InputT originalInput = replacement.getInput();



    LOG.debug("Replacing {} with {}", original, replacement);

    transforms.replaceNode(original, originalInput, replacement.getTransform());

    try {

      OutputT newOutput = replacement.getTransform().expand(originalInput);

      Map<PValue, ReplacementOutput> originalToReplacement =

          replacementFactory.mapOutputs(original.getOutputs(), newOutput);

      // Ensure the internal TransformHierarchy data structures are consistent.

      transforms.setOutput(newOutput);

      transforms.replaceOutputs(originalToReplacement);

    } finally {

      transforms.popNode();

    }

  }



  @VisibleForTesting

  void validate(PipelineOptions options) {

    this.traverseTopologically(new ValidateVisitor(options));

    final Collection<Map.Entry<String, Collection<PTransform<?, ?>>>> errors =

        Collections2.filter(instancePerName.asMap().entrySet(), Predicates.not(new IsUnique<>()));

    if (!errors.isEmpty()) {

      switch (options.getStableUniqueNames()) {

        case OFF:

          break;

        case WARNING:

          LOG.warn(

              "The following transforms do not have stable unique names: {}",

              Joiner.on(", ").join(transform(errors, new KeysExtractor())));

          break;

        case ERROR: // be very verbose here since it will just fail the execution

          throw new IllegalStateException(

              String.format(

                      "Pipeline update will not be possible"

                          + " because the following transforms do not have stable unique names: %s.",

                      Joiner.on(", ").join(transform(errors, new KeysExtractor())))

                  + "\n\n"

                  + "Conflicting instances:\n"

                  + Joiner.on("\n")

                      .join(transform(errors, new UnstableNameToMessage(instancePerName)))

                  + "\n\nYou can fix it adding a name when you call apply(): "

                  + "pipeline.apply(<name>, <transform>).");

        default:

          throw new IllegalArgumentException(

              "Unrecognized value for stable unique names: " + options.getStableUniqueNames());

      }

    }

  }



  /**

   * Returns a unique name for a transform with the given prefix (from enclosing transforms) and

   * initial name.

   */

  private String uniquifyInternal(String namePrefix, String origName) {

    String name = origName;

    int suffixNum = 2;

    while (true) {

      String candidate = buildName(namePrefix, name);

      if (usedFullNames.add(candidate)) {

        return candidate;

      }

      // A duplicate!  Retry.

      name = origName + suffixNum++;

    }

  }



  /** Builds a name from a "/"-delimited prefix and a name. */

  private String buildName(String namePrefix, String name) {

    return namePrefix.isEmpty() ? name : namePrefix + "/" + name;

  }



  private static class ValidateVisitor extends PipelineVisitor.Defaults {



    private final PipelineOptions options;



    public ValidateVisitor(PipelineOptions options) {

      this.options = options;

    }



    @Override

    public CompositeBehavior enterCompositeTransform(Node node) {

      if (node.getTransform() != null) {

        node.getTransform().validate(options);

      }

      return CompositeBehavior.ENTER_TRANSFORM;

    }



    @Override

    public void visitPrimitiveTransform(Node node) {

      node.getTransform().validate(options);

    }

  }



  private static class TransformToMessage implements Function<PTransform<?, ?>, String> {

    @Override

    public String apply(final PTransform<?, ?> transform) {

      return "    - " + transform;

    }

  }



  private static class UnstableNameToMessage

      implements Function<Map.Entry<String, Collection<PTransform<?, ?>>>, String> {

    private final Multimap<String, PTransform<?, ?>> instances;



    private UnstableNameToMessage(final Multimap<String, PTransform<?, ?>> instancePerName) {

      this.instances = instancePerName;

    }



    @Override

    public String apply(final Map.Entry<String, Collection<PTransform<?, ?>>> input) {

      final Collection<PTransform<?, ?>> values = instances.get(input.getKey());

      return "- name="

          + input.getKey()

          + ":\n"

          + Joiner.on("\n").join(transform(values, new TransformToMessage()));

    }

  }



  private static class KeysExtractor

      implements Function<Map.Entry<String, Collection<PTransform<?, ?>>>, String> {

    @Override

    public String apply(final Map.Entry<String, Collection<PTransform<?, ?>>> input) {

      return input.getKey();

    }

  }



  private static class IsUnique<K, V> implements Predicate<Map.Entry<K, Collection<V>>> {

    @Override

    public boolean apply(final Map.Entry<K, Collection<V>> input) {

      return input != null && input.getValue().size() == 1;

    }

  }

}